Skip to content

[core] Implement a thread pool and call the CPython API on all threads within the same concurrency group #52575

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
May 9, 2025

Conversation

kevin85421
Copy link
Member

@kevin85421 kevin85421 commented Apr 24, 2025

Why are these changes needed?

We see the following error message from the CI runs of test_threaded_actor.py (example1, example2).

image

The message "Fatal Python error: PyGILState_Release: auto-releasing thread-state, but no thread-state for this thread" is very scary, but it will not cause any tests to fail.

The root cause is that PyGILState_Release is called on a thread that has never called PyGILState_Ensure. See the CPython source code for more details.

The reason is that we can't control which thread in the thread pool will run the initializer/releaser. Hence, if a concurrency group has more than one thread, the error message above may be printed when we gracefully shut down an actor (i.e., ray.actor.exit_actor()).

In this PR, we implement our own thread pool using std::thread, ensuring that both the initializer and the releaser run on the same thread. Consequently, from the Python interpreter’s perspective, all Python threads in the same concurrency group remain active even after they finish executing Ray tasks.

Related issue number

Closes #51071

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(
# test.py
import ray

@ray.remote
class ThreadActor:
    def __init__(self):
        self.counter = 0

    def increment(self):
        self.counter += 1
        return self.counter

    def terminate(self):
        ray.actor.exit_actor()

actor = ThreadActor.options(max_concurrency=10).remote()
print(ray.get(actor.increment.remote()))
ray.get(actor.terminate.remote())
  • Without this PR: Ran the test 20 times and encountered the error "PyGILState_Release: auto-releasing thread-state" 20 times.
    Screenshot 2025-04-30 at 5 23 27 PM
  • With this PR: Ran the test 20 times and encountered the error 0 times.
    Screenshot 2025-04-30 at 5 25 10 PM

Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
@kevin85421 kevin85421 added the go add ONLY when ready to merge, run all tests label Apr 24, 2025
@edoakes
Copy link
Collaborator

edoakes commented Apr 24, 2025

This means that users cannot access thread-local state when a concurrency group has more than one thread.

What will the behavior be if a user does try to access thread-local state currently? There are really only two acceptable options here IMO unless there is a significant:

  1. Enforce that users cannot use thread-local state and document this limitation clearly. Not sure if it's possible to prevent them from using it, so this is only acceptable if the Python behavior is well-defined when a user tries to use an unsupported call from an uninitialized thread.
  2. Fix the problem for multiple threads as well.

Is there a reason why we can't fix the underlying problem and ensure that the initializer is run once per thread and the same thread runs the corresponding release call? Naiively it seems like this should only require keeping a map of thread_id -> releaser.

@kevin85421
Copy link
Member Author

What will the behavior be if a user does try to access thread-local state currently?

  • Without this PR
    • It is OK to access thread-local state if the task is running on the MainThread or in a concurrency group that has only one thread.
    • For a concurrency group with multiple threads, it depends on whether the task that accesses the thread-local state (access_task) is running on the same thread as the task that sets it (set_task). However, several issues must be addressed to ensure that these two tasks run on the same thread.
      • User Interface: we don't expose an user-facing API to enable them to specify to run a task in a specific thread in a concurrency group.
      • Ray Core C++: To the best of my knowledge, boost::asio::thread_pool doesn't expose an API that allows users to specify particular threads.

Is there a reason why we can't fix the underlying problem and ensure that the initializer is run once per thread and the same thread runs the corresponding release call? Naiively it seems like this should only require keeping a map of thread_id -releaser.

As I mentioned above, boost::asio::thread_pool doesn't expose an API that allows users to specify particular threads to the best of my knowledge.

@edoakes
Copy link
Collaborator

edoakes commented Apr 24, 2025

As I mentioned above, boost::asio::thread_pool doesn't expose an API that allows users to specify particular threads to the best of my knowledge.

Got it, in that case it sounds like we'll need to use the lower-level APIs to manage our own basic thread pool and implement the init/release logic. This should not be too challenging given how simple the usage in thread_pool.h is.

@kevin85421
Copy link
Member Author

I think it's fine not to support thread-local state for concurrency groups with more than one thread. I remember discussing this behavior with @stephanie-wang several months ago when we were trying to move the RayCG execution loop to the main thread.

However, executing the initializer is not only for thread-local state; it also aligns Ray more closely with the Python interpreter's assumptions. That is, once a thread with a given thread ID exits, it cannot be restarted. If we want to run the initializer/releaser on each thread, we may need to get rid of the thread_pool and manage threads ourselves.

@kevin85421
Copy link
Member Author

I just saw #52575 (comment) after I submitted #52575 (comment). Implementing our own thread pool makes sense. I want to confirm with you that the goal of implementing our own thread pool to initialize and release Python threads is not to support thread-local state; rather, it is to fulfill the Python interpreter's assumptions, as I mentioned in the previous comment. Users should still not use thread-local state for a concurrency group with multiple threads because of the user interface issue mentioned in #52575 (comment).

@edoakes
Copy link
Collaborator

edoakes commented Apr 24, 2025

Yes exactly. I agree we should not encourage users to do this, but we should fulfill the Python interpreter's assumptions. This will also avoid undefined behavior and/or scary stack traces like the one in this ticket.

As an example, there might be library code that uses thread local storage that users aren't even aware of. We would want to make sure that the code at least runs correctly and doesn't fail in unexpected & confusing ways.

@kevin85421
Copy link
Member Author

@edoakes Is it okay to implement our own thread pool using a naive round-robin approach? If not, I’d prefer to merge this PR first, and then I can follow up with another PR to implement it after on-call.

I took a look at the source code of the post function in boost::asio::thread_pool. It's not trivial if we plan to implement the scheduler by ourselves.

@kevin85421 kevin85421 marked this pull request as ready for review April 25, 2025 07:17
@edoakes
Copy link
Collaborator

edoakes commented Apr 25, 2025

The use case here is quite simple and the work is coarse-grained (task executions). We should be able to use an io_context as a basic queue and rely on it for scheduling.

Psuedocode:

// Start threads.
for i := range num_threads:
    threads_.emplace_back(io_context.run());

// Post work to the queue.
io_context.post(work_callback);

// Graceful shutdown.
io_context.stop();
for i := range num_threads:
    thread.join();

Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
@@ -236,6 +240,39 @@ def test_tasks_on_different_executors(self, ray_start_regular_shared):
assert value == "f2"


def test_multiple_threads_in_same_group(ray_start_regular_shared):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. We can't use threading.enumerate() to check the number of threads because the threads are not launched by Python.

  2. The threads are visible for py-spy because it checks the information from OS.

Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
@kevin85421 kevin85421 changed the title [core] Fix "Fatal Python error: PyGILState_Release: auto-releasing thread-state, but no thread-state for this thread" [core] Implement a thread pool and call the CPython API on all threads within the same concurrency group May 1, 2025
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>

/// Join the thread pool.
void BoundedExecutor::Join() { pool_->join(); }
void BoundedExecutor::Join() {
work_guard_.reset();
Copy link
Member Author

@kevin85421 kevin85421 May 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maintain the previous behavior. We can’t assume that Join will always be called after Stop; therefore, we need to reset work_guard_ here.

It's fine to call work_guard_.reset() twice. The second one will not do anything.

Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
kevin85421 added 4 commits May 1, 2025 12:34
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
@kevin85421
Copy link
Member Author

@edoakes CI tests pass!

releaser();
}
});
init_future.wait();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think instead of having this future that waits for each thread in sequence, you should kick off all threads and have a latch that the constructor will wait on before exiting
https://en.cppreference.com/w/cpp/thread/latch
in boost for c++17
https://www.boost.org/doc/libs/1_88_0/boost/thread/latch.hpp

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could also avoid implementing our own threadpool and keep using boost::threadpool with this. Having each thread run an io_context when it actually doesn't have to do any io means we're wasting some time doing epoll, etc. when it doesn't have to

  • have a latch/barrier that waits for all of the init functions to start running (because the thread is blocked on the latch you can guarantee that each of the inits will be posted to its own thread)
  • have a latch/barrier that waits for all of the init functions to finish

at the end

  • call wait first to make sure all threads are idle, post releaser functions that wait on a latch/barrier again, and then join and stop

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think instead of having this future that waits for each thread in sequence, you should kick off all threads and have a latch that the constructor will wait on before exiting
https://en.cppreference.com/w/cpp/thread/latch
in boost for c++17
https://www.boost.org/doc/libs/1_88_0/boost/thread/latch.hpp

Thanks! Happy to learn new C++ techniques! I’ll take a look. I’m not sure how much benefit running all threads in parallel will provide in Python if the initializer needs to acquire the GIL.

we could also avoid implementing our own threadpool and keep using boost::threadpool with this. Having each thread run an io_context when it actually doesn't have to do any io means we're wasting some time doing epoll, etc. when it doesn't have to

This seems hacky to me. Is this a common usage pattern for boost::asio::thread_pool?

"because the thread is blocked on the latch you can guarantee that each of the inits will be posted to its own thread" => We rely on a behavior that boost::asio neither guarantees nor documents. thread_pool::post() doesn’t guarantee which thread will execute a given event.

post releaser functions that wait on a latch/barrier again, and then join and stop

We need to ensure that the releasers run on the same threads as the initializers that create them. I am not sure whether there is an easy way to do that or not.

It’s a bit over-engineered and may cause potential issues in my opinion. If we don’t have strong evidence, I’d prefer to keep the current implementation. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unsure how common of a usage pattern it is. We can ensure initializer / releaser pairs with thread_id -> releaser map or something.

I'm ok with keeping the current simpler implementation. i doubt the overhead of io_context really matters for us. Another option is just implementing a lightweight threadpool with condition variables and a task queue if working around asio::threadpool with waits and latches and thread ids is rough.

Leaving decision to @edoakes

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving decision to

Which decision are you referring to:

(1) The current implementation versus a “lightweight thread pool with condition variables and a task queue,” or
(2) The current implementation versus the thread-pool solution?

For (1), what benefits does the condition-variable solution offer? We can discuss whether it’s worth it or not. For (2), if there’s no strong evidence of benefits or a common usage pattern, it’s a strong no from my perspective.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per offline discussion, we'll go with the existing implementation in the PR (basic hand-rolled threadpool using io_context). The reasoning is:

  1. We aren't forcing asio threadpool into a model it isn't built for
  2. It's very simple, so without justification it's preferred over a more complex implementation that might be more optimal

@kevin85421 to run full suite of benchmarks to validate (2)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use latch 1130e71

@dayshah dayshah self-assigned this May 5, 2025
edoakes pushed a commit that referenced this pull request May 5, 2025
In PR #52575, the debug build keeps failing without any error messages.
When I change the instance type from medium to large, all CI tests pass.
I suspect it may be an OOM issue.

Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
@kevin85421
Copy link
Member Author

kevin85421 commented May 8, 2025

I did a benchmark for the io_context implementation and the condition variable implementation.

4 threads and post 1000000 times.

  • io_context: 881.9 ms (the average of 10 runs)
  • cond var: 1105.8 ms (the average of 10 runs)

kevin85421 added 3 commits May 8, 2025 02:00
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
@kevin85421
Copy link
Member Author

@edoakes there are still some comments that I haven't addressed. I will ping you when all comments are addressed.

kevin85421 added 6 commits May 8, 2025 13:51
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
Signed-off-by: Kai-Hsun Chen <kaihsun@anyscale.com>
@kevin85421
Copy link
Member Author

@edoakes All comments have been addressed.

@edoakes edoakes merged commit 478877e into master May 9, 2025
5 checks passed
@edoakes edoakes deleted the laptop-ray3-20250423 branch May 9, 2025 16:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
community-backlog go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[core] Only one of the threads in a thread pool will be initialized as a long-running Python thread
4 participants